1   /*
2    * Copyright 2019 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.integration.rsocket;
18  
19  import java.lang.reflect.Method;
20  import java.nio.charset.StandardCharsets;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Map;
24  import java.util.function.BiFunction;
25  
26  import org.springframework.context.ApplicationEventPublisher;
27  import org.springframework.context.ApplicationEventPublisherAware;
28  import org.springframework.core.io.buffer.DataBuffer;
29  import org.springframework.lang.Nullable;
30  import org.springframework.messaging.Message;
31  import org.springframework.messaging.MessageHeaders;
32  import org.springframework.messaging.handler.CompositeMessageCondition;
33  import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
34  import org.springframework.messaging.rsocket.RSocketRequester;
35  import org.springframework.messaging.rsocket.annotation.support.RSocketFrameTypeMessageCondition;
36  import org.springframework.messaging.rsocket.annotation.support.RSocketRequesterMethodArgumentResolver;
37  import org.springframework.util.Assert;
38  import org.springframework.util.ReflectionUtils;
39  
40  /**
41   * An {@link IntegrationRSocketMessageHandler} extension for RSocket service side.
42   * <p>
43   * In a plain Spring Integration application instances of this class are created by the
44   * {@link ServerRSocketConnector} internally and a new RSocket server is started over there.
45   * When an existing RSocket server is in use, an instance of this class has to be
46   * provided as a {@link #responder()} into that server and a {@link ServerRSocketConnector}
47   * should accept the same instance as a delegate.
48   *<p>
49   * With a {@link #messageMappingCompatible} option this class also handles
50   * {@link org.springframework.messaging.handler.annotation.MessageMapping} methods,
51   * covering both Spring Integration and standard
52   * {@link org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler}
53   * functionality.
54   *
55   * @author Artem Bilan
56   *
57   * @since 5.2.1
58   */
59  public class ServerRSocketMessageHandler extends IntegrationRSocketMessageHandler
60  		implements ApplicationEventPublisherAware {
61  
62  	private static final Method HANDLE_CONNECTION_SETUP_METHOD =
63  			ReflectionUtils.findMethod(ServerRSocketMessageHandler.class, "handleConnectionSetup", Message.class);
64  
65  
66  	private final Map<Object, RSocketRequester> clientRSocketRequesters = new HashMap<>();
67  
68  	private BiFunction<Map<String, Object>, DataBuffer, Object> clientRSocketKeyStrategy =
69  			(headers, data) -> data.toString(StandardCharsets.UTF_8);
70  
71  	private ApplicationEventPublisher applicationEventPublisher;
72  
73  	/**
74  	 * Create an service side RSocket message handler instance for delegating
75  	 * to {@link IntegrationRSocketEndpoint} beans and collect {@link RSocketRequester}s
76  	 * from client connections.
77  	 */
78  	public ServerRSocketMessageHandler() {
79  		this(false);
80  	}
81  
82  	/**
83  	 * Create an service side RSocket message handler instance for delegating
84  	 * to {@link IntegrationRSocketEndpoint} beans and collect {@link RSocketRequester}s
85  	 * from client connections.
86  	 * When {@code messageMappingCompatible == true}, this class also handles
87  	 * {@link org.springframework.messaging.handler.annotation.MessageMapping} methods
88  	 * as it is done by the standard
89  	 * {@link org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler}.
90  	 * @param messageMappingCompatible whether handle also
91  	 * {@link org.springframework.messaging.handler.annotation.MessageMapping}.
92  	 */
93  	public ServerRSocketMessageHandler(boolean messageMappingCompatible) {
94  		super(messageMappingCompatible);
95  	}
96  
97  	/**
98  	 * Configure a {@link BiFunction} to extract a key for mapping connected {@link RSocketRequester}s.
99  	 * Defaults to the {@code destination} a client is connected.
100 	 * @param clientRSocketKeyStrategy the {@link BiFunction} to use.
101 	 */
102 	public void setClientRSocketKeyStrategy(
103 			BiFunction<Map<String, Object>, DataBuffer, Object> clientRSocketKeyStrategy) {
104 
105 		Assert.notNull(clientRSocketKeyStrategy, "'clientRSocketKeyStrategy' must not be null");
106 		this.clientRSocketKeyStrategy = clientRSocketKeyStrategy;
107 	}
108 
109 	/**
110 	 * Get connected {@link RSocketRequester}s mapped by the keys from the connect messages.
111 	 * @return the map of connected {@link RSocketRequester}s.
112 	 * @see #setClientRSocketKeyStrategy
113 	 */
114 	public Map<Object, RSocketRequester> getClientRSocketRequesters() {
115 		return Collections.unmodifiableMap(this.clientRSocketRequesters);
116 	}
117 
118 	/**
119 	 * Obtain a connected {@link RSocketRequester} mapped by provided key or null.
120 	 * @param key the key for mapped {@link RSocketRequester} if any.
121 	 * @return the mapped {@link RSocketRequester} or null.
122 	 */
123 	@Nullable
124 	public RSocketRequester getClientRSocketRequester(Object key) {
125 		return this.clientRSocketRequesters.get(key);
126 	}
127 
128 	@Override
129 	public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
130 		this.applicationEventPublisher = applicationEventPublisher;
131 	}
132 
133 	void registerHandleConnectionSetupMethod() {
134 		registerHandlerMethod(this, HANDLE_CONNECTION_SETUP_METHOD,
135 				new CompositeMessageCondition(
136 						RSocketFrameTypeMessageCondition.CONNECT_CONDITION,
137 						new DestinationPatternsMessageCondition(new String[] { "*" }, obtainRouteMatcher())));
138 	}
139 
140 	@SuppressWarnings("unused")
141 	private void handleConnectionSetup(Message<DataBuffer> connectMessage) {
142 		DataBuffer dataBuffer = connectMessage.getPayload();
143 		MessageHeaders messageHeaders = connectMessage.getHeaders();
144 		Object rsocketRequesterKey = this.clientRSocketKeyStrategy.apply(messageHeaders, dataBuffer);
145 		RSocketRequester rsocketRequester =
146 				messageHeaders.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER,
147 						RSocketRequester.class);
148 		this.clientRSocketRequesters.put(rsocketRequesterKey, rsocketRequester);
149 		RSocketConnectedEvent rSocketConnectedEvent =
150 				new RSocketConnectedEvent(this, messageHeaders, dataBuffer, rsocketRequester); // NOSONAR
151 		if (this.applicationEventPublisher != null) {
152 			this.applicationEventPublisher.publishEvent(rSocketConnectedEvent);
153 		}
154 		else {
155 			if (logger.isInfoEnabled()) {
156 				logger.info("The RSocket has been connected: " + rSocketConnectedEvent);
157 			}
158 		}
159 	}
160 
161 }